package ins.framework.kafkastream.processor;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;

import java.util.HashMap;
import java.util.Map;

public class StreamDemo {
    public static void main(String[] args) {
        // 创建一个StreamsConfig对象
        StreamsConfig config = new StreamsConfig(StreamDemo.connection("streamtopic3p_group_topo3","10.10.1.7"));
        // 创建一个TopologyBuilder对象
//        https://docs.confluent.io/4.1.1/streams/javadocs/index.html
        Topology topology = new Topology();
        topology.addSource("source_name","streamtopic3p")
                .addProcessor("processor_a", ProcessorA::new, "source_name")
                .addProcessor("processor_b", ProcessorB::new, "processor_a");
////                .addSink("SINK1", "topicA", "PROCESS1")
////                .addSink("SINK2", "topicB", "PROCESS2")
////                .addSink("SINK3", "topicC", "PROCESS3");

        // 创建一个KafkaStreams对象，传入TopologyBuilder和StreamsConfig
        KafkaStreams kafkaStreams =  new KafkaStreams(topology,config);
        // 启动kafkaStreams
        kafkaStreams.start();

    }
    public static Map<String, Object> connection(String groupName, String ip) {
        Map<String, Object> properties = new HashMap<>();
        // 指定一个应用ID，会在指定的目录下创建文件夹，里面存放.lock文件
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, groupName);
        // 指定kafka集群
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ip+":9092");
        // 指定一个路径创建改应用ID所属的文件
        properties.put(StreamsConfig.STATE_DIR_CONFIG, "E:\\kafka-stream");
        // key 序列化 / 反序列化
        properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        // value 序列化 / 反序列化
        properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return properties;
    }

}
